import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.keras import utils
from tqdm import tqdm
def get_batch():
global BATCH_START, TIME_STEPS
# xs shape (50batch, 20steps)
xs = np.arange(BATCH_START, BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE, TIME_STEPS)) / (10*np.pi)
seq = np.sin(xs)
res = np.cos(xs)
BATCH_START += TIME_STEPS
return [seq[:, :, np.newaxis], res[:, :, np.newaxis], xs]
BATCH_START = 0
TIME_STEPS = 20
INPUT_SIZE = 1
OUTPUT_SIZE = 1
BATCH_SIZE = 50
CELL_SIZE = 20
LR = 0.001
model = tf.keras.models.Sequential()
model.add(layers.LSTM(CELL_SIZE,
batch_input_shape=(BATCH_SIZE, TIME_STEPS, INPUT_SIZE), # Or: input_dim=INPUT_SIZE, input_length=TIME_STEPS,
return_sequences=True, # True: output at all steps. False: output as last step.
stateful=True, # True: the final state of batch1 is feed into the initial state of batch2
))
model.add(layers.TimeDistributed(layers.Dense(OUTPUT_SIZE)))
model.compile(optimizer = 'adam', loss = 'mse')
for step in range(501):
# data shape = (batch_num, steps, inputs/outputs)
X_batch, Y_batch, xs = get_batch()
cost = model.train_on_batch(X_batch, Y_batch)
pred = model.predict(X_batch, BATCH_SIZE)
plt.plot(xs[0, :], Y_batch[0].flatten(), 'r', xs[0, :], pred.flatten()[:TIME_STEPS], 'b--')
plt.ylim((-1.2, 1.2))
plt.draw()
plt.pause(0.1)
if step % 10 == 0:
print('train cost: ', cost)